{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "A7HK1Duzdnew"
      },
      "outputs": [],
      "source": [
        "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License"
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Use MLTransform to scale data\n",
        "\n",
        "<table align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/data_preprocessing/scale_data.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/data_preprocessing/scale_data.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n",
        "  </td>\n",
        "</table>\n"
      ],
      "metadata": {
        "id": "ZUSiAR62SgO8"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "Scaling data is an important preprocessing step for training machine learning (ML) models, because it helps to ensure that all features have a similar weight or influence on the model. The following are benefits of scaling data:\n",
        "\n",
        "* **Improved convergence of gradient descent algorithms:** Many machine learning algorithms, such as linear regression and neural networks, use gradient descent to optimize their parameters. Gradient descent iteratively moves the parameters of the model in the direction that reduces the loss function. If the features aren't scaled, features with larger ranges can have a much larger impact on the gradient, making it difficult for the model to converge. Scaling the features helps to ensure that all features contribute equally to the gradient, which can lead to faster and more stable convergence.\n",
        "\n",
        "* **Uniformity in features:** If one feature has a much larger range than the other features, it can dominate the model and make it difficult for the model to learn from the other features. This lack of uniformity can cause poor performance and biased predictions. Scaling the features brings all of the features into a similar range.\n",
        "\n",
        "\n",
        "To scale your dataset using Apache Beam, use `MLTransform` with one of the following transforms:\n",
        "\n",
        "* `ScaleTo01`: Calculates the minimum and maximum of an entire dataset, and then scales the dataset between 0 and 1 based on minimum and maximum values.\n",
        "* `ScaleToZScore`: Calculates the mean and variance of an entire dataset, and then scales the dataset based on those values.\n",
        "* `ScaleByMinMax`: Scales the data in a dataset, taking minimum and maximum values as input parameters.\n",
        "\n",
        "For each data processing transform, `MLTransform` runs in both `write` mode and `read` mode. For more information about using `MLTransform`, see [Preprocess data with MLTransform](https://beam.apache.org/documentation/ml/preprocess-data/) in the Apache Beam documentation.\n",
        "\n",
        "## MLTransform in write mode\n",
        "\n",
        "When `MLTransform` is in `write` mode, it produces artifacts, such as minimum, maximum, and variance, for different data processing transforms. These artifacts allow you to ensure that you're applying the same artifacts, and any other preprocessing transforms, when you train your model and serve it in production, or when you test its accuracy.\n",
        "\n",
        "## MLTransform in read mode\n",
        "\n",
        "In read mode, `MLTransform` uses the artifacts generated in `write` mode to scale the entire dataset."
      ],
      "metadata": {
        "id": "F8-yTcjPUHLv"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Import the required modules\n",
        "\n",
        "To use `MLTransfrom`, install `tensorflow_transform` and the Apache Beam SDK version 2.53.0 or later."
      ],
      "metadata": {
        "id": "8qhhZTbIyqFb"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "! pip install apache_beam[interactive]>=2.53.0 --quiet\n",
        "! pip install tensorflow-transform --quiet"
      ],
      "metadata": {
        "id": "cFP4CQksT_Zv"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "import os\n",
        "import tempfile\n",
        "import apache_beam as beam\n",
        "from apache_beam.ml.transforms.base import MLTransform\n",
        "from apache_beam.ml.transforms.tft import ScaleTo01\n",
        "from apache_beam.ml.transforms.tft import ScaleByMinMax\n",
        "from apache_beam.ml.transforms.tft import ScaleToZScore"
      ],
      "metadata": {
        "id": "y-B7OZ0lyeee"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "artifact_location_scale_to_01 = tempfile.mkdtemp(prefix='scale_to_01_')\n",
        "artifact_location_scale_to_zscore = tempfile.mkdtemp(prefix='scale_to_zscore_')\n",
        "artifact_location_scale_by_min_max = tempfile.mkdtemp(prefix='scale_by_min_max_')"
      ],
      "metadata": {
        "id": "UyBji8pSUEJt"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# data used in MLTransform's write mode\n",
        "data = [\n",
        "    {'int_feature_1' : 11, 'int_feature_2': -10},\n",
        "    {'int_feature_1': 34, 'int_feature_2': -33},\n",
        "    {'int_feature_1': 5, 'int_feature_2': -63},\n",
        "    {'int_feature_1': 12, 'int_feature_2': -38},\n",
        "    {'int_feature_1': 32, 'int_feature_2': -65},\n",
        "    {'int_feature_1': 63, 'int_feature_2': -21},\n",
        "]\n",
        "\n",
        "# data used in MLTransform's read mode\n",
        "test_data = [\n",
        "    {'int_feature_1': 29, 'int_feature_2': -20},\n",
        "    {'int_feature_1': -5, 'int_feature_2': -11},\n",
        "    {'int_feature_1': 5, 'int_feature_2': -44},\n",
        "    {'int_feature_1': 29, 'int_feature_2': -12},\n",
        "    {'int_feature_1': 20, 'int_feature_2': -53},\n",
        "    {'int_feature_1': 70, 'int_feature_2': -8}\n",
        "]\n"
      ],
      "metadata": {
        "id": "lNG0WI7Dy1dd"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Scale the data between 0 and 1\n",
        "\n",
        "Scale the data so that it's in the range of 0 to 1. To scale the data, the transform calculates minimum and maximum values on the whole dataset, and then performs the following calculation:\n",
        "\n",
        "`x = (x - x_min) / (x_max - x_min)`\n",
        "\n",
        "To scale the data, use the [ScaleTo01](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.tft.html#apache_beam.ml.transforms.tft.ScaleTo01) data processing transform in `MLTransform`."
      ],
      "metadata": {
        "id": "MjMPgB1T0Wzu"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# MLTransform in write mode.\n",
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(data)\n",
        "\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(write_artifact_location=artifact_location_scale_to_01).with_transform(\n",
        "          ScaleTo01(columns=['int_feature_1', 'int_feature_2'])\n",
        "      )\n",
        "  )\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 211
        },
        "id": "xuoLla2u0Cf_",
        "outputId": "dab7f024-ca5e-4f2b-83b8-c09612c19450"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "display_data",
          "data": {
            "application/javascript": [
              "\n",
              "        if (typeof window.interactive_beam_jquery == 'undefined') {\n",
              "          var jqueryScript = document.createElement('script');\n",
              "          jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
              "          jqueryScript.type = 'text/javascript';\n",
              "          jqueryScript.onload = function() {\n",
              "            var datatableScript = document.createElement('script');\n",
              "            datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
              "            datatableScript.type = 'text/javascript';\n",
              "            datatableScript.onload = function() {\n",
              "              window.interactive_beam_jquery = jQuery.noConflict(true);\n",
              "              window.interactive_beam_jquery(document).ready(function($){\n",
              "                \n",
              "              });\n",
              "            }\n",
              "            document.head.appendChild(datatableScript);\n",
              "          };\n",
              "          document.head.appendChild(jqueryScript);\n",
              "        } else {\n",
              "          window.interactive_beam_jquery(document).ready(function($){\n",
              "            \n",
              "          });\n",
              "        }"
            ]
          },
          "metadata": {}
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(int_feature_1=array([0.10344828], dtype=float32), int_feature_2=array([1.], dtype=float32))\n",
            "Row(int_feature_1=array([0.5], dtype=float32), int_feature_2=array([0.58181816], dtype=float32))\n",
            "Row(int_feature_1=array([0.], dtype=float32), int_feature_2=array([0.03636364], dtype=float32))\n",
            "Row(int_feature_1=array([0.12068965], dtype=float32), int_feature_2=array([0.4909091], dtype=float32))\n",
            "Row(int_feature_1=array([0.46551725], dtype=float32), int_feature_2=array([0.], dtype=float32))\n",
            "Row(int_feature_1=array([1.], dtype=float32), int_feature_2=array([0.8], dtype=float32))\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "In the this dataset, the following are the minimum and maximum values for the columns:\n",
        "* `int_feature_1`: 5 and 63.\n",
        "* `int_feature_2`: -65 and -10\n",
        "\n",
        "In the output for the column `int_feature_1`, the data is scaled between 0 and 1 by using the values `5` and `63`. `5` is scaled to `0`, and `63` is scaled to `1`. The remaining values are scaled between 0 and 1 by using the formula `x = (x - x_min) / (x_max - x_min)`."
      ],
      "metadata": {
        "id": "Hwvh4UyI23pz"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# MLTransform in read mode\n",
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(test_data)\n",
        "\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(read_artifact_location=artifact_location_scale_to_01)\n",
        "  )\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "id": "M8obCCCL1TM6",
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "outputId": "813bf5c2-6be7-49d3-d622-33f3f66fe2c9"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(int_feature_1=array([0.41379312], dtype=float32), int_feature_2=array([0.8181818], dtype=float32))\n",
            "Row(int_feature_1=array([-0.1724138], dtype=float32), int_feature_2=array([0.9818182], dtype=float32))\n",
            "Row(int_feature_1=array([0.], dtype=float32), int_feature_2=array([0.38181818], dtype=float32))\n",
            "Row(int_feature_1=array([0.41379312], dtype=float32), int_feature_2=array([0.96363634], dtype=float32))\n",
            "Row(int_feature_1=array([0.25862068], dtype=float32), int_feature_2=array([0.21818182], dtype=float32))\n",
            "Row(int_feature_1=array([1.1206896], dtype=float32), int_feature_2=array([1.0363636], dtype=float32))\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "`MLTransform` learned in `write` mode that `int_feature_1` ranges from 5 to 63.\n",
        "\n",
        "In `read` mode, when it encounters 29 in `test_data` for `int_feature_1`, it scales it by using the following formula:\n",
        "```\n",
        "(value - min) / (max - min)\n",
        "```\n",
        "\n",
        "The following calculation shows the formula with the values:\n",
        "```\n",
        "(29 - 5) / (63 - 5) = 0.41379312\n",
        "```\n",
        "\n",
        "Twenty-nine is scaled based on the minimum and maximum values generated in `write` mode.\n"
      ],
      "metadata": {
        "id": "jwSiMoAH4vq5"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Scale by using the z-score\n",
        "\n",
        "Similar to `ScaleTo01`, use [ScaleToZScore](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.tft.html#apache_beam.ml.transforms.tft.ScaleToZScore) to scale the values by using the [z-score](https://www.tensorflow.org/tfx/transform/api_docs/python/tft/scale_to_z_score#:~:text=Scaling%20to%20z%2Dscore%20subtracts%20out%20the%20mean%20and%20divides%20by%20standard%20deviation.%20Note%20that%20the%20standard%20deviation%20computed%20here%20is%20based%20on%20the%20biased%20variance%20(0%20delta%20degrees%20of%20freedom)%2C%20as%20computed%20by%20analyzers.var).\n"
      ],
      "metadata": {
        "id": "VH1CmCGm_PtS"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# MLTransform in write mode\n",
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(data)\n",
        "\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(write_artifact_location=artifact_location_scale_to_zscore).with_transform(\n",
        "          ScaleToZScore(columns=['int_feature_1', 'int_feature_2'])\n",
        "      )\n",
        "  )\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "OOrnLBbD_Qe4",
        "outputId": "4754b0ac-dc40-4a55-c999-33270e785acc"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(int_feature_1=array([-0.76950264], dtype=float32), int_feature_2=array([1.401755], dtype=float32))\n",
            "Row(int_feature_1=array([0.3974355], dtype=float32), int_feature_2=array([0.2638597], dtype=float32))\n",
            "Row(int_feature_1=array([-1.0739213], dtype=float32), int_feature_2=array([-1.2203515], dtype=float32))\n",
            "Row(int_feature_1=array([-0.7187662], dtype=float32), int_feature_2=array([0.01649117], dtype=float32))\n",
            "Row(int_feature_1=array([0.2959626], dtype=float32), int_feature_2=array([-1.3192989], dtype=float32))\n",
            "Row(int_feature_1=array([1.8687923], dtype=float32), int_feature_2=array([0.8575442], dtype=float32))\n"
          ]
        }
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "# MLTransform in read mode\n",
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(test_data)\n",
        "\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(read_artifact_location=artifact_location_scale_to_zscore)\n",
        "  )\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "lLXGBaOtCRv-",
        "outputId": "3ef4d142-0be2-4350-b2c4-3b676be74cee"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(int_feature_1=array([0.14375328], dtype=float32), int_feature_2=array([0.9070179], dtype=float32))\n",
            "Row(int_feature_1=array([-1.5812857], dtype=float32), int_feature_2=array([1.3522812], dtype=float32))\n",
            "Row(int_feature_1=array([-1.0739213], dtype=float32), int_feature_2=array([-0.28035107], dtype=float32))\n",
            "Row(int_feature_1=array([0.14375328], dtype=float32), int_feature_2=array([1.3028076], dtype=float32))\n",
            "Row(int_feature_1=array([-0.31287467], dtype=float32), int_feature_2=array([-0.7256144], dtype=float32))\n",
            "Row(int_feature_1=array([2.2239475], dtype=float32), int_feature_2=array([1.5007024], dtype=float32))\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Scale by using ScaleByMinMax\n",
        "\n",
        "Use  [ScaleByMinMax](https://github.com/apache/beam/blob/9e8a310f0c0faddfba28176df5893d8ad8fd10a0/sdks/python/apache_beam/ml/transforms/tft.py#L450) to scale your data into the range of `[min_value, max_value]`."
      ],
      "metadata": {
        "id": "McISy8QkC00v"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "min_value = 1\n",
        "max_value = 10\n",
        "\n",
        "# MLTransform in write mode\n",
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(data)\n",
        "\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(write_artifact_location=artifact_location_scale_by_min_max).with_transform(\n",
        "          ScaleByMinMax(columns=['int_feature_1', 'int_feature_2'], min_value=min_value, max_value=max_value)\n",
        "      )\n",
        "  )\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "cpn_wKQ3Cxo0",
        "outputId": "20be1e2b-36a2-4bae-dc53-5e5b5f1692be"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(int_feature_1=array([1.9310346], dtype=float32), int_feature_2=array([10.], dtype=float32))\n",
            "Row(int_feature_1=array([5.5], dtype=float32), int_feature_2=array([6.2363634], dtype=float32))\n",
            "Row(int_feature_1=array([1.], dtype=float32), int_feature_2=array([1.3272727], dtype=float32))\n",
            "Row(int_feature_1=array([2.086207], dtype=float32), int_feature_2=array([5.418182], dtype=float32))\n",
            "Row(int_feature_1=array([5.1896553], dtype=float32), int_feature_2=array([1.], dtype=float32))\n",
            "Row(int_feature_1=array([10.], dtype=float32), int_feature_2=array([8.200001], dtype=float32))\n"
          ]
        }
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "# MLTransform in read mode\n",
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(test_data)\n",
        "\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(read_artifact_location=artifact_location_scale_by_min_max)\n",
        "  )\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "3aQjFPzBI7dS",
        "outputId": "e9967889-27eb-4295-dfad-a285d670826c"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(int_feature_1=array([4.7241383], dtype=float32), int_feature_2=array([8.363636], dtype=float32))\n",
            "Row(int_feature_1=array([-0.5517242], dtype=float32), int_feature_2=array([9.836364], dtype=float32))\n",
            "Row(int_feature_1=array([1.], dtype=float32), int_feature_2=array([4.4363637], dtype=float32))\n",
            "Row(int_feature_1=array([4.7241383], dtype=float32), int_feature_2=array([9.672727], dtype=float32))\n",
            "Row(int_feature_1=array([3.3275862], dtype=float32), int_feature_2=array([2.9636364], dtype=float32))\n",
            "Row(int_feature_1=array([11.086206], dtype=float32), int_feature_2=array([10.327272], dtype=float32))\n"
          ]
        }
      ]
    }
  ]
}
